package app.dwd;


import app.BaseSQLApp;
import common.Constant;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import utils.SQLUtil;

/**
 * @Author lzc
 * @Date 2022/10/10 14:26
 */
public class Dwd_02_DwdTradeCartAdd extends BaseSQLApp {
    public static void main(String[] args) {
        new Dwd_02_DwdTradeCartAdd().init(
                3002,
                2,
                "Dwd_02_DwdTradeCartAdd"
        );
    }

    @Override
    protected void handle(StreamExecutionEnvironment env,
                          StreamTableEnvironment tEnv) {
        // 1. 消费 ods_db
        // ddl方式
        readOdsDb(tEnv,"Dwd_02_DwdTradeCartAdd");
        // sql中的 print 是阻塞式方法
        //        tEnv.sqlQuery("select * from ods_db").execute().print();
        // 2. 从 ods_db中过滤出加购
        Table cartInfo = tEnv.sqlQuery("select " +
                                            " `data`['id'] id, " +
                                            " `data`['user_id'] user_id, " +
                                            " ts " +
                                            "from ods_db " +
                                            "where `database`='edu' " +
                                            "and `table`='cart_info' ");
        tEnv.createTemporaryView("cart_info", cartInfo);

        //写出
        tEnv.executeSql("create table dwd_trade_cart_add(" +
                                            "id string,  " +
                                            "user_id string,  " +
                                            "ts bigint  " +
                                            ")" + SQLUtil.getKafkaSink(Constant.TOPIC_DWD_TRADE_CART_ADD));
        cartInfo.executeInsert("dwd_trade_cart_add");
    }
}
/*

 加购的过滤:
        database = edu
        table = cart_info
        type= insert update

        sku_1      2     新增
        sku_1      5-2   update
        insert 或者 update 的时候,sku_num 变大

 */